package com.ws.gc.web.auth;

import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

public class ReactorDataMonitor {
    private static final Map<Class, FluxSink> handlers = new ConcurrentHashMap<>();

    /**
     * 监控指定类型的数据
     *
     * @param clz     数据类型
     * @param handler 数据消费方式
     */
    public static void monitor(Class clz, Consumer handler) {
        Flux<Object> objectFlux = Flux.create(sink -> {
            handlers.put(clz, sink);
            sink.onCancel(() -> handlers.remove(clz));
        }, FluxSink.OverflowStrategy.LATEST);

        objectFlux.subscribe(handler);
    }

    /**
     * 取消监控数据
     *
     * @param clz 数据类型
     */
    public static void unMonitor(Class clz) {
        handlers.remove(clz);
    }

    /**
     * 发布数据
     *
     * @param object
     */
    public static void publish(Object object) {
        handlers.forEach((key, value) -> {
            if (key.equals(object.getClass())) {
                value.next(object);
            }
        });
    }

    public static void main(String[] args) {
        monitor(Integer.class, System.out::println);
        unMonitor(Integer.class);
        publish(9);
    }
}
